package com.tomato.study.redis.stream.consumer.config;

import com.tomato.study.redis.stream.core.RedisConstants;
import com.tomato.study.redis.stream.service.RedisStreamService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import javax.annotation.Resource;
import java.time.Duration;
import java.util.Collections;

/**
 * 消费者组和消费者配置
 *
 * @author lizhifu
 * @date 2022/5/11
 */
@Configuration
@Slf4j
public class RedisStreamListenerConfig {
    @Resource
    private RedisStreamService redisStreamService;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ?> streamMessageListenerContainerOptions(){
        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                // block读取超时时间
                .pollTimeout(Duration.ofSeconds(1))
                .build();
    }

    /**
     * 开启监听器接收消息
     * @param factory
     * @param streamMessageListenerContainerOptions
     * @return
     */
    @Bean
    public StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory factory,
                                                                         StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ?> streamMessageListenerContainerOptions){
        StreamMessageListenerContainer listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        // 如果 stream 流不存在 创建 stream 流
        if(!stringRedisTemplate.hasKey(RedisConstants.REDIS_STREAM1) ){
            redisStreamService.add(RedisConstants.REDIS_STREAM1, Collections.singletonMap("", ""));
            log.info("初始化stream {} success",RedisConstants.REDIS_STREAM1);
        }else {
            log.info("stream {} 已存在",RedisConstants.REDIS_STREAM1);
        }

        // 创建消费者组
        try {
            redisStreamService.createGroup(RedisConstants.REDIS_STREAM1, RedisConstants.REDIS_STREAM1_GROUP1);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在",RedisConstants.REDIS_STREAM1_GROUP1);
        }
        try {
            redisStreamService.createGroup(RedisConstants.REDIS_STREAM1, RedisConstants.REDIS_STREAM1_GROUP2);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在",RedisConstants.REDIS_STREAM1_GROUP2);
        }

        listenerContainer.start();
        return listenerContainer;
    }
}
